package com.atguigu.Flink.datastream.transform;

import com.atguigu.Flink.POJO.Event;
import com.atguigu.Flink.function.ClickSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Flink06_UserDefinePartition {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> ds = env.addSource(new ClickSource());
        //实现自定义分区
        ds.partitionCustom(new Partitioner<String>() {

            @Override
            public int partition(String s, int i) {
                if(s.equals("otto")||s.equals("xuanDog")){
                    return 0;
                }else{
                    return 1;
                }
            }
        },
                new KeySelector<Event, String>() {
                    @Override
                    public String getKey(Event event) throws Exception {
                        return event.getUser();
                    }
                }).print().setParallelism(2);

        try {
            env.execute();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }
}
